/******************************************************************************
This file is part of AppKit.
Project: appkit
Author : FergusZeng
Email  : cblock@126.com
git	   : https://gitee.com/newgolo/appkit.git
*******************************************************************************
MIT License

Copyright (c) 2022 cblock@126.com

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
******************************************************************************/
#pragma once
#include <sys/prctl.h>

#include <atomic>
#include <memory>
#include <queue>
#include <string>
#include <vector>
#include <utility>

#include "appkit/basetype.h"
#include "appkit/strutil.h"
#include "appkit/thread.h"

namespace appkit {
/**
 * @brief 异步回调
 * @tparam T
 * @note 使用单独线程依次执行回调,回调函数的参数是数据指针.
 */
template <typename T>
class AsyncCallback : public Runnable {
public:
    using Callback = std::function<void(std::shared_ptr<T>)>;

public:
    explicit AsyncCallback(const std::string& name = "") { m_name = name; }
    virtual ~AsyncCallback() { m_thread.stop(); }
    /**
     * @brief 增加回调
     * @param callback
     */
    void addCallback(const Callback& callback) {
        std::lock_guard<std::mutex> lock(m_callbackMutex);
        m_callbacks.emplace_back(callback);
    }

    /**
     * @brief 执行所有回调
     * @param arg 回调方法参数
     * @return true:成功 false:失败
     */
    bool callAvailable(const std::shared_ptr<T>& arg) {
        if (!m_startFlag.load()) {
            if (!m_thread.start(*this)) {
                return false;
            }
            m_startFlag.store(true);
        }
        {
            std::lock_guard<std::mutex> lock(m_cvMutex);
            m_argQueue.push(std::move(arg));
        }
        // notify不需要持有锁
        m_cond.notify_all();
        return true;
    }

private:
    void run(const Thread& thread) final {
        if (!m_name.empty()) {
            auto name =
                StrUtil::format("%s_%x", m_name.data(), Thread::threadID());
            prctl(PR_SET_NAME, name.data());  // 设置线程名称
        }
        while (thread.isRunning()) {
            std::shared_ptr<T> arg = nullptr;
            {
                std::unique_lock<std::mutex> lock(m_cvMutex);
                if (!m_cond.wait_for(lock, std::chrono::milliseconds(10),
                                     [&] { return m_argQueue.size() != 0; })) {
                    continue;
                }
            }
            {
                std::lock_guard<std::mutex> lock(m_cvMutex);
                if (m_argQueue.size() != 0) {
                    arg = std::move(m_argQueue.front());
                    m_argQueue.pop();
                }
            }
            if (arg) {
                std::lock_guard<std::mutex> lock(m_callbackMutex);
                for (auto callback : m_callbacks) {
                    callback(arg);
                }
            }
        }
        m_startFlag.store(false);
    }

private:
    Thread m_thread{"AsyncCallback"};
    std::string m_name{""};
    std::atomic<bool> m_startFlag{false};
    std::vector<Callback> m_callbacks;
    std::queue<std::shared_ptr<T>> m_argQueue;
    std::condition_variable m_cond;
    std::mutex m_callbackMutex;
    std::mutex m_cvMutex;
};

/**
 * @brief 异步任务队列(线程池)
 */
class AsyncTaskQueue {
    DECL_CLASSMETA(AsyncTaskQueue)

public:
    using Ptr = std::shared_ptr<AsyncTaskQueue>;
    using Task = std::function<void()>;

public:
    explicit AsyncTaskQueue(const std::string& name = "");
    virtual ~AsyncTaskQueue();
    /**
     * @brief 初始化任务队列
     * @param workers 线程数
     */
    bool init(int workers = 1);
    /**
     * @brief 放置任务到任务队列中
     * @param task
     */
    void putTask(const Task& task);
    /**
     * @brief 获取当前任务队列的任务个数
     * @return uint32
     */
    uint32 size();

private:
    Task getTask();

private:
    std::string m_name{""};
    int m_workerNum{0};
    std::atomic<bool> m_startFlag{false};
    std::queue<Task> m_taskQueue;
    std::vector<std::future<void>> m_workers;
    std::condition_variable m_cond;
    std::mutex m_cvMutex;
};
}  // namespace appkit
